-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add direct consumer support #169
Conversation
…dules and init args for direct consumer mode.
Hi @sircinek. Thanks for PR. I need to give a proper look later but it looks to be a good addition. Before that...
Could you remove these changes from this PR? These should be discussed in separate PR(s) and I would like you to include only amqp_direct_consumer changes in this PR 🙏 |
Hey @ono !
Absolutely, reverted all unrelated changes. Do you want me to open a separate PR, or you want to take them in smaller chunks ? |
Great. Let's get this PR done at first. I am hoping to make some time for this PR soon. Bear with me. |
|
||
@doc """ | ||
Opens a new Channel in a previously opened Connection. | ||
|
||
## Options | ||
* `:consumer_type` - specifies the type of consumer used as callback module. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be :default
or :custom
as currently :direct
option lets you pass anything there.
It could also be more restrictive and allow only default callback module (DirectReceiver
) implementation - which was my original intention - then this control flag had more sense. @ono what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:default
and :direct
might be my preference. I am not comfortable with :custom
sets DirectReceiver as a default consumer. :direct
makes more sense to me. we can introduce :custom
in future if required as an additional option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other option is to eliminate this option and take {consumer_module, args}
as a tuple like Erlang library does. Then make the following changes:
- Rename
DirectReciver
toDirectConsumer
- Add a proper documentation to
DirectConsumer
module
Then the user can use it simply like this:
chan = AMQP.Channel.open(conn) # call :amqp_connection.open_channel/1
chan = AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()}) # call :amqp_connection.open_channel/2
This approach makes more sense to me as direct or non-direct is not a right concern on channel layer. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do the latter, as it is better aligned with how Erlang library works.
Side note - maybe we would like to change Receiver
to implement amqp_gen_consumer
and be a callback module substituting default amqp_selective_receiver
as it more or less does the same thing ? As a separate PR ofc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main purpose of Receiver is...
- to convert the data from Erlang library(Record) to Elixir friendly data type(Map)
- to receive messages for a channel in a single process so we can ensure the order of messages (deliver, returns etc.)
Now... I am a bit lost of the purpose of this PR 😓
To align 100%, could you write down an example code with direct_consumer and how it solves the problems?
Hi, is there any progress in this? We could also use this functionality. |
Yes, sorry I have been bit too busy with other stuff but should be able to look into it in this week. Thanks for your patience 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am really sorry about my slow response. I have been occupied with other business.
Great work 👍 I am keen on adding this to the project. Require a few changes so please check out my comments in code.
Thanks!
|
||
@doc """ | ||
Opens a new Channel in a previously opened Connection. | ||
|
||
## Options | ||
* `:consumer_type` - specifies the type of consumer used as callback module. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:default
and :direct
might be my preference. I am not comfortable with :custom
sets DirectReceiver as a default consumer. :direct
makes more sense to me. we can introduce :custom
in future if required as an additional option.
|
||
case :amqp_channel.subscribe(chan.pid, basic_consume, receiver.pid) do | ||
:direct -> | ||
consumer_pid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to subscribe direct consumer pid? Isn't it subscribed automatically when channel is opened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU, amqp_channel.subscribe/3
starts a consumer for a queue that is bound to specific channel (and shares its lifetime (or until cancelled)), with opts specified in #basic_consume{}
. The pid is in fact ignored by our (and rabbitmq) implementation of direct consumer, but the server method needs to be sent anyway.
Basically we can pass any pid there, but it doesn't make sense to start extra receiver as it will not get any messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to subscribe a channel when you have a direct consumer for it?
@@ -19,100 +19,124 @@ defmodule BasicTest do | |||
assert :ok = Basic.publish(meta[:chan], "", "", "ping") | |||
end | |||
|
|||
test "basic return", meta do | |||
:ok = Basic.return(meta[:chan], self()) | |||
for cons <- [:selective, :direct] do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is making test's readability lower. Can we leave the existing tests as they are? Then add couple tests which test direct consumer behaviours? It can be a separate test file like direct_consumer_test.exs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify - I am not suggesting copy paste all tests to direct_consumer_test.exs
. Probably a good idea to write a test for direct consumer which covers open channel, send and consume a message, close a channel etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do. I just didn't want to duplicate code, as existing tests could cover both cases. I do agree though on readability argument.
What's the state of this PR? I'd gladly take it over as we still could really use that change. @sircinek ? |
this work is handed over to #172 |
This PR aims at adding support for direct consumer.
It provides default callback module implementing
amqp_gen_consumer
behavior, based on erlang client libraryamqp_direct_consumer
.This can be useful when you want to maintain 1 to 1 relationship, between channel and consumer.
It also puts your consumer directly under amqp tree, making it a little less error/race condition prone, as well as simplifying message flow between channel and user defined consumer (1 layer less).
PR consists of some hygiene changes, updating deprecated functions and addressing dialyzer/credo issues as well as bumping up dependencies versions.